This notebook SNIP is a very brief use case applying how we can use centroids from previous malware analysis to build a classifier of new malware.
Our full dataset is described here: https://github.com/action-ai-institute/MABEL-dataset/tree/main
You can retrieve a cleaned version ready for modeling here: https://github.com/solomonsonya/Artificial_Intelligence_Research/blob/main/malware_classification/nlp/archive/nlp_MABEL_dataset/standardized_dataset/_malware_family_master_dataset.7z
This notebook relies on the standardized_import_functions attribute provided in the dataset.
print('configuring imports now...')
import threading
import pandas as pd
import time
import random
import os
from sklearn.metrics import *
from sklearn.feature_extraction.text import *
from tabulate import *
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import statsmodels.api as sm
import re
import logging
import inspect
import io
from plotly.subplots import make_subplots
from wordcloud import WordCloud
from pathlib import Path
import csv
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from datetime import *
from scipy import sparse
from sklearn.metrics.pairwise import cosine_similarity, cosine_distances
import joblib
from sklearn.metrics import jaccard_score
from sklearn.preprocessing import MultiLabelBinarizer
from scipy.signal import find_peaks
from matplotlib import gridspec
from scipy.stats import mode
from scipy.stats import entropy
from sklearn.decomposition import PCA
from mpl_toolkits.mplot3d import Axes3D
import plotly.express as px
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from kneed import KneeLocator
import warnings
warnings.filterwarnings('ignore')
print('imports complete')
configuring imports now... imports complete
###############################################################################################################################
# Import DataSet
##############################################################################################################################
input_file_path = './input/pe_MABEL_api_imports_SAMPLE.csv'
print(f'reading MABEL snip file --> {input_file_path}')
df_original_input = pd.read_csv(input_file_path, sep=',')
print('read complete!')
###############################################################################################################################
# Set Variables
##############################################################################################################################
class_label_name = 'family_name'
feature_name = 'standardized_import_functions'
feature_name_and_library_name = 'import_functions'
unique_instance_identifier = 'sha256_hash'
sort_feature_name = 'count_import_functions'
ACTIVE_THREAD_COUNT = 1
df_to_analyze = df_original_input
##############################################################################################################################
# Run Single Threaded Tests
##############################################################################################################################
label = class_label_name
# get list of unique labels
lst_unique_labels = list(df_to_analyze[label].unique())
# prepare dictionary of active and completed threads
lst_ready_threads = list()
dict_COMPLETED_threads = dict()
lst_RUNNING_threads = list()
###########################################################################################################
# filter df
###########################################################################################################
lst_initial_feature_names = [ 'sha256_hash',
'clam_av_scan_results',
'yara_malware',
'family_name',
'sample_name',
'md5_hash',
'sha1_hash',
'sha224_hash',
'sha384_hash',
'sha512_hash',
'ssdeep',
'file_size',
'binary_file_size',
'time_stamp',
'file_type_extension',
'summary_detected_languages',
'entropy(min=0.0; max=8.0)',
'execution_section_name',
'execution_size_bytes',
'import_functions',
'standardized_import_functions',
'count_import_functions',
'peid',
'yara_packer',
'yara_rat',
'code_sections',
'code_section_sizes',
'entropy_per_section',
'number_sections'
]
df_input = df_original_input.filter(items=lst_initial_feature_names)
reading MABEL snip file --> ./input/pe_MABEL_api_imports_SAMPLE.csv read complete!
path_MASTER_malware_family_centroids = './input/_malware_family_variant_centroids.csv'
##########################################################################
# read centroid
##########################################################################
print(f'reading in master centroid file at path --> {path_MASTER_malware_family_centroids}')
df_centroid = pd.read_csv(path_MASTER_malware_family_centroids, sep=',')
print(f'read complete. num rows imported: {len(df_centroid)}')
##########################################################################
# init
##########################################################################
class_label_name = 'family_name'
feature_name = 'standardized_import_functions'
feature_name_and_library_name = 'import_functions'
sort_feature_name = 'count_import_functions'
distance_metric = 'cosine'
instance_identifier_name = 'sha256_hash'
##########################################################################
# vectorize centroid file
##########################################################################
print(f'Vectorizing dataframe now...')
tf_idf_vectorizer = TfidfVectorizer(norm='l2')
mtx_tf_idf = tf_idf_vectorizer.fit_transform(df_centroid[feature_name])
print(f'vectorization complete. Calculating {distance_metric} pairwise distances now...')
# calculate euclidean distance from vectorized input
distance_matrix = pairwise_distances(mtx_tf_idf, metric=distance_metric) # --> metric='cosine'
# convert mtx to dataframe for processing
df_distance_matrix = pd.DataFrame(distance_matrix, index=df_centroid[instance_identifier_name].tolist(), columns=df_centroid[instance_identifier_name])
# create output directory
os.makedirs('./output/', exist_ok=True)
# store raw output
df_distance_matrix.to_csv('./output/df_distance_matrix_centroid.csv', index=True)
########################################################################
# normalize distances and convert to percentage
########################################################################
# get the max value for normalization
max_distance = np.max(distance_matrix)
# calculate distance percentages
if distance_metric == 'cosine':
similarity_matrix = (1 - distance_matrix) # values from vectorizer are already L2 normalized, thus since non-negative values, our cosine similarity is guaranteed to be between 0 and 1
else:
similarity_matrix = (1 - distance_matrix / max_distance)
# convert matrix to dataframe
df_similarity_matrix = pd.DataFrame(similarity_matrix, index=df_centroid[instance_identifier_name].tolist(), columns=df_centroid[instance_identifier_name])
# store raw output --> INTRODUCE EUCLIDEAN_SIMILARITY
df_similarity_matrix.to_csv('./output/df_similarity_matrix_centroid.csv', index=True)
print('drawing interactive 3D chart now...')
jitter_value = 0.08
chart_title = f'Malware Family 3D Scatter Plot: PCA of Cosine Similarity Matrix with ({jitter_value}) jitter'
# ==> interactive_3d_scatter_plot(df_similarity_matrix, df_input, 'all_samples', 'cosine', './malware_family_centroids.html', jitter_value, False, 0.60, 'sha256_hash', 'intra_family_cosine_similarity_cluster', -1, chart_title)
print('\n\n\nCOMPLETE!!!!')
reading in master centroid file at path --> ./input/_malware_family_variant_centroids.csv read complete. num rows imported: 30 Vectorizing dataframe now... vectorization complete. Calculating cosine pairwise distances now... drawing interactive 3D chart now... COMPLETE!!!!
cosine_similarity_cluster_threshold_percentage = 0.60
optimal_k = -1
instance_identifier = 'sha256_hash'
#df_original_dataset = df_input
cluster_feature_name = 'intra_family_cosine_similarity_cluster'
output_path = './output/malware_family_centroids.html'
distance_metric = 'cosine'
jitter_value = 0.08
####################################
##################################
# create unique index for each malware family mapping
unique_malware_family = df_centroid['family_name'].unique()
family_name_to_index = {family: idx for idx, family in enumerate(unique_malware_family)}
df_centroid['family_name_index'] = df_centroid['family_name'].map(family_name_to_index)
##################################
#try:
# notify
if cosine_similarity_cluster_threshold_percentage > -1:
print(f'Commencing PCA of {distance_metric} similarity matrix into 3D scatter plot. Jitter is set to {jitter_value}. cosine_similarity_cluster_threshold_percentage is set to [{cosine_similarity_cluster_threshold_percentage}]...')
else:
print(f'Commencing PCA of {distance_metric} into 3D scatter plot. Jitter is set to {jitter_value}. optimal_k is set to [{optimal_k}]...')
if optimal_k > 0:
chart_title = str(chart_title) + '. Optimal k: [' + str(optimal_k) + ']'
########################################################################
# validate at least 3 instances exist for PCA to 3D
########################################################################
if len(df_similarity_matrix) < 3:
print(f"PUNT! I can not proceed with PCA for this similarity matrix. I require at least 3 instances to continue. I was provided: [{len(df_similarity_matrix)}]")
########################################################################
# Perform PCA to reduce the dimensions to 3D for the scatter plot
########################################################################
pca = PCA(n_components=3)
pca_result = pca.fit_transform(df_similarity_matrix)
# Add jitter to avoid overlap
jitter = jitter_value * np.random.randn(*pca_result.shape)
pca_result_jitter = pca_result + jitter
# Create a DataFrame for the PCA results
df_pca = pd.DataFrame({
'PCA Component 1': pca_result_jitter[:, 0],
'PCA Component 2': pca_result_jitter[:, 1],
'PCA Component 3': pca_result_jitter[:, 2],
instance_identifier: df_similarity_matrix.index
})
########################################################################
# merge df features with pca df
########################################################################
# Merge the selected features back into df_pca
df_pca_with_features = df_pca.merge(df_centroid, on=instance_identifier, how='left')
########################################################################
# update label into df
########################################################################
#df_pca_with_features['family_name'] = label
# Directly get the count_import_functions for the new column
#df_pca_with_features['count_imports'] = df_selected_features['count_import_functions']
########################################################################
# Create a 3D scatter plot using Plotly
########################################################################
fig = px.scatter_3d(
df_pca_with_features,
x='PCA Component 1',
y='PCA Component 2',
z='PCA Component 3',
color='family_name',
opacity=0.6,
hover_data={'sha256_hash': True,
'family_name': True,
'count_import_functions': True,
'clam_av_scan_results': True,
'yara_malware': True,
'file_size': True,
'entropy(min=0.0; max=8.0)': True,
'malware_variant_centroid': True,
#'time_stamp': True,
'PCA Component 1': False,
'PCA Component 2': False,
'PCA Component 3': False,
'family_name_index': False}
)
# Optional: Tight layout
fig.update_layout(title=chart_title,
margin=dict(l=100, r=0, b=0, t=30),
width=1400, # Adjust width
height=1000 # Adjust height
)
########################################################################
# save the plot
########################################################################
fig.write_html(output_path)
fig.show()
print(f"COMPLETE! Interactive 3D scatter plot saved to --> {output_path}")
Commencing PCA of cosine similarity matrix into 3D scatter plot. Jitter is set to 0.08. cosine_similarity_cluster_threshold_percentage is set to [0.6]...
COMPLETE! Interactive 3D scatter plot saved to --> ./output/malware_family_centroids.html
# Instance identifier for classification
instance_indentifier_to_classify = 'b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e'
# Get the instance to classify
instance_to_classify = df_input[df_input['sha256_hash'] == instance_indentifier_to_classify]
# Vectorize the instance using the same vectorizer used on the classification dataset
sparse_mtx_instance_to_classify = tf_idf_vectorizer.transform(instance_to_classify[feature_name])
# Calculate cosine distances between the new instance and all centroids
distance_to_centroids = pairwise_distances(sparse_mtx_instance_to_classify, mtx_tf_idf, metric=distance_metric)
# Convert distances to a DataFrame for easier handling
df_distances_instance_to_centroid = pd.DataFrame(distance_to_centroids, index=[instance_indentifier_to_classify],
columns=df_centroid[instance_identifier_name])
# Find the closest centroid by finding the index of the minimum distance
closest_centroid_idx = df_distances_instance_to_centroid.iloc[0].idxmin()
# Get the closest centroid's details (including family name)
df_closest_centroid = df_centroid[df_centroid[instance_identifier_name] == closest_centroid_idx]
# Output the family name of the closest centroid
predicted_family = df_closest_centroid[class_label_name].values[0]
predicted_malware_family_variant = df_closest_centroid["malware_variant"].values[0]
# Notify
print(f'Predicted malware family class for instance: [{instance_indentifier_to_classify}] ==> {predicted_family} - Malware Family Variant: {predicted_malware_family_variant}')
########################################################################
# Normalize distances to percentage
########################################################################
# Get the max value for normalization
max_distance = np.max(distance_to_centroids)
# Avoid division by zero (in case max distance is 0)
if max_distance == 0:
max_distance = 1
# Calculate similarity matrix based on distance metric
if distance_metric == 'cosine':
# Cosine similarity is 1 - cosine distance (values range between 0 and 1)
similarity_matrix = 1 - distance_to_centroids
else:
# Normalize other distances by dividing by max distance (values range between 0 and 1)
similarity_matrix = 1 - (distance_to_centroids / max_distance)
# Convert similarity matrix to percentage
#similarity_percentage_matrix = similarity_matrix * 100
# Convert matrix to DataFrame for easier handling (use the instance identifier as the row index)
df_similarity_matrix = pd.DataFrame(similarity_matrix, index=[instance_indentifier_to_classify], columns=df_centroid[instance_identifier_name])
# Display the similarity percentage matrix
#df_similarity_matrix
Predicted malware family class for instance: [b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e] ==> 7ev3n - Malware Family Variant: 7ev3n.0
df_distances_instance_to_centroid.T
| b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e | |
|---|---|
| sha256_hash | |
| 660cbfe94f7fd3c235662026d0aada4f10481357da647813bb5a86ab1a67c06e | 0.000000 |
| ecd73a863c8c81e02a130423b88cac9de51a307f5d5efc3645ad6a99142bbba5 | 1.000000 |
| 00cdee79a9afc1bf239675ba0dc1850da9e4bf9a994bb61d0ec22c9fdd3aa36f | 1.000000 |
| 52c356000529ec0927939a7374457f4386533efede771d71af737cad253369d8 | 0.476504 |
| 1490e74b93b40176975836156dc62210b7670ab5eb38f153a21cda8c72bebc76 | 0.889338 |
| c14f8bc656284715516f26935afe487a1d584f56ffabbcb98f2974f6ca6cd3a4 | 0.948900 |
| e2816883a7a514fe1a3fbce95c04c2fc735f0c7ab872f7c23978388c42aea5c2 | 1.000000 |
| 3791be67c1644b2291e4c96814f095621bc88b19909133fa7e727edb2cdfc539 | 0.777904 |
| da609d3211d60d5b11feaeaa717834cbe86e18103a1ed4fc09c2ee3e1cff9442 | 0.622153 |
| b72f85aadd5f84e641a265b3fcf3084c40477a4f158c7dc7134431bb4100b290 | 0.794484 |
| 86bcfce2dd342e9a1c04cfc65731d40ed1c397a4ec47bd9f5b41771297d81100 | 1.000000 |
| cd32a737fcba8198d43fed5a68348f983f7713f79574a710deb7759e5a1301eb | 0.941602 |
| 7a51bf0527aa3f38ee5a9ae52c1a4f63d67d68af2da7b488f8ba7b66d665e618 | 0.508391 |
| e59cc3a94f6a5119f36c4e0b3fbe6f04cc474d0b0b9d101163dac75722c809da | 0.899541 |
| cd8256d1c896a8de9ccf50e26f97106295f4e664aef6ba3ee0883420c01ac374 | 0.891654 |
| 7920fb3873012f1abef8c2abfb905e53317595874985f24b23fb318b54b1e243 | 0.695797 |
| 9af4b3b8c67d21fef69dee132cb686d1cb9e34e2d5e807b05c2a92e48f08dd39 | 0.951829 |
| a7cbeeba9fd5f17a1e5be18ea55db5727fe1c7f69471f7b28dae1887900d763b | 1.000000 |
| 3c8a5062da56c98a9405d638d9ad2b6200a3e1a8f9f6744652ac1b6b0cbe0c3f | 0.888471 |
| 9c15da4530235287957687016dafb1789d74fda7bae19162eb9a9140c0971690 | 0.485778 |
| 527691a32c0ee7ad0dc94ab9a19816b59e5222a0e40d60fe12e07d8d1fe096bd | 0.599975 |
| b3a99ecc4ab9f73d814f0f64a3aa0c71ee3cf94872f2f8ca3a2a1c5d630c095d | 0.791844 |
| e19e0e7f8871e7acf07a37b277b5aaf2aa125f28ee11644c335eb313ec768df5 | 0.728757 |
| d6255b4b18e6f07c4708cf6344163dfe3197cf403957bf3085a6a737bb37b038 | 0.738741 |
| a2578dfff2159337da456e1cf434e40f0d0b4b61a0c6e73480be25e40b9358f5 | 0.791753 |
| f94ec3342628661ca48da3fd62833aa4bfcd5966f570e44622607deed5f515b1 | 0.856785 |
| a2e642f7b896e12eb319a2dabcc274224db4d3b21be1eee3ed15eb96e0ba6d8f | 0.974868 |
| 9d94ebc7fb7e15a75b25aab7d5db3fadcf5ce5e83b8acbb7f2c3b4747415f279 | 0.796184 |
| d42d6f9becfb61dd82dbf20a65537ab4bc6fcf59ca1d3b6aae012929f5f36f46 | 0.881489 |
| 35b1aab3c0d05118473d4b7b1a8e3f25ef6015e4b6d4ab3efecb5b70fc6d609c | 0.708075 |
df_similarity_matrix.T
| b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e | |
|---|---|
| sha256_hash | |
| 660cbfe94f7fd3c235662026d0aada4f10481357da647813bb5a86ab1a67c06e | 1.000000 |
| ecd73a863c8c81e02a130423b88cac9de51a307f5d5efc3645ad6a99142bbba5 | 0.000000 |
| 00cdee79a9afc1bf239675ba0dc1850da9e4bf9a994bb61d0ec22c9fdd3aa36f | 0.000000 |
| 52c356000529ec0927939a7374457f4386533efede771d71af737cad253369d8 | 0.523496 |
| 1490e74b93b40176975836156dc62210b7670ab5eb38f153a21cda8c72bebc76 | 0.110662 |
| c14f8bc656284715516f26935afe487a1d584f56ffabbcb98f2974f6ca6cd3a4 | 0.051100 |
| e2816883a7a514fe1a3fbce95c04c2fc735f0c7ab872f7c23978388c42aea5c2 | 0.000000 |
| 3791be67c1644b2291e4c96814f095621bc88b19909133fa7e727edb2cdfc539 | 0.222096 |
| da609d3211d60d5b11feaeaa717834cbe86e18103a1ed4fc09c2ee3e1cff9442 | 0.377847 |
| b72f85aadd5f84e641a265b3fcf3084c40477a4f158c7dc7134431bb4100b290 | 0.205516 |
| 86bcfce2dd342e9a1c04cfc65731d40ed1c397a4ec47bd9f5b41771297d81100 | 0.000000 |
| cd32a737fcba8198d43fed5a68348f983f7713f79574a710deb7759e5a1301eb | 0.058398 |
| 7a51bf0527aa3f38ee5a9ae52c1a4f63d67d68af2da7b488f8ba7b66d665e618 | 0.491609 |
| e59cc3a94f6a5119f36c4e0b3fbe6f04cc474d0b0b9d101163dac75722c809da | 0.100459 |
| cd8256d1c896a8de9ccf50e26f97106295f4e664aef6ba3ee0883420c01ac374 | 0.108346 |
| 7920fb3873012f1abef8c2abfb905e53317595874985f24b23fb318b54b1e243 | 0.304203 |
| 9af4b3b8c67d21fef69dee132cb686d1cb9e34e2d5e807b05c2a92e48f08dd39 | 0.048171 |
| a7cbeeba9fd5f17a1e5be18ea55db5727fe1c7f69471f7b28dae1887900d763b | 0.000000 |
| 3c8a5062da56c98a9405d638d9ad2b6200a3e1a8f9f6744652ac1b6b0cbe0c3f | 0.111529 |
| 9c15da4530235287957687016dafb1789d74fda7bae19162eb9a9140c0971690 | 0.514222 |
| 527691a32c0ee7ad0dc94ab9a19816b59e5222a0e40d60fe12e07d8d1fe096bd | 0.400025 |
| b3a99ecc4ab9f73d814f0f64a3aa0c71ee3cf94872f2f8ca3a2a1c5d630c095d | 0.208156 |
| e19e0e7f8871e7acf07a37b277b5aaf2aa125f28ee11644c335eb313ec768df5 | 0.271243 |
| d6255b4b18e6f07c4708cf6344163dfe3197cf403957bf3085a6a737bb37b038 | 0.261259 |
| a2578dfff2159337da456e1cf434e40f0d0b4b61a0c6e73480be25e40b9358f5 | 0.208247 |
| f94ec3342628661ca48da3fd62833aa4bfcd5966f570e44622607deed5f515b1 | 0.143215 |
| a2e642f7b896e12eb319a2dabcc274224db4d3b21be1eee3ed15eb96e0ba6d8f | 0.025132 |
| 9d94ebc7fb7e15a75b25aab7d5db3fadcf5ce5e83b8acbb7f2c3b4747415f279 | 0.203816 |
| d42d6f9becfb61dd82dbf20a65537ab4bc6fcf59ca1d3b6aae012929f5f36f46 | 0.118511 |
| 35b1aab3c0d05118473d4b7b1a8e3f25ef6015e4b6d4ab3efecb5b70fc6d609c | 0.291925 |
def classify_malware(instance_indentifier_to_classify, arg_min, df_norm, search_metric, search_description, df_centroid):
# set the dataframe with the distance calculation: distance == Euclidean, angle == cosine similarity
if arg_min:
# get the min value
search_value = df_norm.min().min()
else:
search_value = df_norm.max().max()
# get list of centroid identifiers that are at min/max distance
lst_centroids_at_best_search_distance = [
col for col in df_norm.columns if search_value in df_norm[col].values
]
# iterate through list
for centroid_instance_identifier in lst_centroids_at_best_search_distance:
# get centorid instance
df_malware_family = df_centroid[df_centroid['sha256_hash'] == centroid_instance_identifier]
# get malware family name:
pred_malware_family = df_malware_family.family_name.values[0]
# notify
print(f'Malware Instance: {instance_indentifier_to_classify} is closest to Malware Family: {pred_malware_family} with {search_description.title()} {search_description.title()} == {search_value}')
return lst_centroids_at_best_search_distance
# classify using Euclidean distance calculation
lst_centroids_of_greatest_similarity_euclidean_distance = classify_malware('b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e', True, df_distances_instance_to_centroid, 'euclidean', 'distance', df_centroid)
# classify using Cosine similaritycalculation
lst_centroids_of_greatest_similarity_cosine_similarity = classify_malware('b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e', False, df_similarity_matrix, 'cosine', 'similarity', df_centroid)
Malware Instance: b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e is closest to Malware Family: 7ev3n with Distance Distance == 0.0 Malware Instance: b08ddfb984e88489a727d8124dcae1d3bca5c6ba5712a94c086277270912486e is closest to Malware Family: 7ev3n with Similarity Similarity == 1.0
Euclidean distance shows how far the other centroids are to the test case. A larger number means the cluster is more dissimilar than the test case. A smaller number (approaching to 0.0) indicates the cluster is very close (i.e., very similar) to the test case. Achieving a 0.0 Euclidean distance indicates the test case instance and the centroid are equivalent (i.e., identical).
Regarding cosine similarity</u>, a larger value (approaching 1.0) indicates the centroid is most similar to the test case. A value approaching 0.0 indicates the test case is completely dissimilar to the centroid. We select the largest similarity value here to identify the centroid that is most similar to the test case.
Happy Hunting!
-Solomon Sonya